iT邦幫忙

2021 iThome 鐵人賽

DAY 19
0
Software Development

從零開始Reactive Programming- Spring系列 第 20

[Day 19] Reactive Programming - Reactor (operator fusion)

  • 分享至 

  • xImage
  •  

前言

這篇掙扎了很久要不要寫,算是進階一點的主題,內容雖然不多,但已經讓我絞盡腦汁,關於這個主題我自己也還是有不了解的部分,但有鑑於蠻多影片都會提到operator fusion,決定還是來介紹一下大致上的意思,至於內部的細節只能待高手來補了。基本上就是參考整理並簡化了這篇原文David Karnok Operator-fusion (Part 1),簡中翻譯Piasy

upslash

圖片來源:upslash/greg

operator fusion

operator就是我們常用到的map、filter、merge等操作Reactive stream的方式,fusion則是融合,簡而言之就是透過不同的方式融合operator來達到增加效能節省時間或空間的功效,就稱為 operator fusion,融合的方式又分為兩大類Macro-fusionMicro-fusion,Macro的意思是宏觀,Macro-fusion就是將多個operator合併為單個,反之Micro則是微觀,Micro-fusion就是將各operator內部用到的資源共用。

歷史

reactive programming 是不斷的在演化的,而reactive的歷程分為四代

  • 第0代:盤古開天時期,只有java.util.Observable API,就跟之前介紹Observer pattern差不多,沒有中間層而且無法組合。
  • 第1代:一旦發布無法取消,Producer 和 Consumer無法溝通,若是速率不一致將導致backpressure問題。
  • 第2代:Subscriber可自行決定是否還需要資料,雙方多了溝通管道來支援backpressure
  • 第3代:各公司合作統一定義了Reactive-Streams Spec,彼此兼容。
  • 第4代:operator fusion
  • 第5代:未來目標(2016年),reactive IO.....等新特性。

Macro-fusion

合併或更換operator

之前介紹到publishOn/subscribeOn,如果在source operator(just、range.....)之後是subscribeOn意義就不大,這時候替換publishOn就會更好一些。

修改operator參數

可以考慮將性質差不多的operator合併提升效能。

Flux.range(1, 10) 
   .filter(v -> v % 3 == 0) 
   .filter(v -> v % 2 == 0) 
   .map(v -> v + 1) 
   .map(v -> v * v) 
   .subscribe(System.out::println);
Predicate<Integer> p1 = v -> v % 3 == 0; 
Predicate<Integer> p2 = v -> v % 2 == 0; 
Predicate<Integer> p3 = v -> p1.test(v) && p2.test(v);

Micro-fusion

有些operator內含佇列,有些operator需要傳入佇列,這時候如果能內部共用就可以節省記憶體的空間開校,也能節省不斷創建佇列的效能影響,

同步(Synchronous-fusion)

如果資料來源是同步的而且可以被視為佇列,像是range,fromIterable,fromArray,fromStreamfromCallable,just也算,但是just大部分是用在Macro-fusion,operator內部用到佇列的包含像是 observeOn(), flatMap()publish(), zip()等等,如果在onSubscribe()的時候,發現之前說明用來溝通上下游的Subscription有實作Queue interface,就直接使用而不需自行創建。來先看一下原始碼比較有感覺。
這個是Flux.range實際上的類別,可以看到有implements Fuseable,而他的doc就清楚的寫專門為了 Micro-fusion

final class FluxRange extends Flux<Integer>
  implements Fuseable, SourceProducer<Integer> {
}
/**
 * A micro API for stream fusion, in particular marks producers that support a {@link QueueSubscription}.
 */
public interface Fuseable {
}

flatMap實際上的class  的其中一個方法,從以下程式碼可以看到他直接使用而不是創建新的。
FluxFlatMap.java trySubscribeScalarMap(...)

if (!fuseableExpected || p instanceof Fuseable) {
 p.subscribe(s);
}
else {
 p.subscribe(new FluxHide.SuppressFuseableSubscriber<>(s));
}

原作者聲稱因為這樣的調整讓range().observeOn()的吞吐量從55M Ops/s ->200M Ops/s  提升了接近四倍。

結語

operator fusion可以有效降低reactive dataflows的效能開銷,主要的面向可能是library的開發人員,內容上的確也是比較艱深,我只有挑其中比較能理解的地方拿出來分享,希望以上的介紹可以讓大家對operator fusion有一定的了解,建議可以去看看原文還有提到很多不同的方式。


上一篇
[Day 18] Reactive Programming - Reactor Test(VirtualTime)
下一篇
[Day 20] Reactive Programming - Spring WebFlux
系列文
從零開始Reactive Programming- Spring32
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言